package top.houry.netty.barrage.kafka;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import top.houry.netty.barrage.entity.BarrageMsg;
import top.houry.netty.barrage.service.IBarrageMsgService;

@Component
@Slf4j
public class BarrageKafkaConsumer {

    private IBarrageMsgService barrageMsgService;

    @Autowired
    public void setBarrageMsgService(IBarrageMsgService barrageMsgService) {
        this.barrageMsgService = barrageMsgService;
    }

    @KafkaListener(topics = {"quickstart-events"})
    public void processMessage(ConsumerRecord<?, ?> record, Acknowledgment ack) {
        try {
            log.info("BarrageKafkaConsumer-processMessage-接收到record:{}", record.toString());
            BarrageMsg msg = JSONUtil.toBean(record.value().toString(), BarrageMsg.class);
            barrageMsgService.saveBarrageMsg(msg);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("BarrageKafkaConsumer-processMessage", e);
        }
    }

}
